2023年7月版 EMR on EC2のチュートリアルを試しました
4年近くEMRを触っていなかったので久しぶりにEMRを触ることにしました。ということで、まずはEMR on EC2の Tutorial: Getting started with Amazon EMR - Amazon EMR を試してみることにしました。
前提
- EMR version: emr-6.11.0
- Applicaiton: Spark(実際には PySpark を利用する)
- ハードウェア構成: m6a.xlarge を 1 台(検証用なのでプライマリーノードのみ)
- リージョン: 東京リージョン
- VPC: Default VPC
- EMR service role: EMR_DefaultRole
- EC2 instance profile: EMR_EC2_DefaultRole
- AWS CLI: aws-cli/2.12.6 Python/3.11.4 Linux/4.14.255-314-253.539.amzn2.x86_64 exec-env/CloudShell exe/x86_64.amzn.2 prompt/off(CloudShell を利用)
S3バケットのフォルダ構成
検証用にS3バケットを用意します。DOC-EXAMPLE-BUCKET
の箇所を自身の環境に読み替えてください。health_violations.py が PySpark のスクリプト、food_establishment_data.csv が入力ファイル、myOutputFolder が処理結果の出力先フォルダになります。
- s3://DOC-EXAMPLE-BUCKET/health_violations.py
- s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv
- s3://DOC-EXAMPLE-BUCKET/myOutputFolder
事前準備
ドキュメントに記載されている通り health_violations.py と food_establishment_data.csv を用意してS3バケットにアップロードします。
health_violations.py
処理内容は violation_type = 'RED'
であるレコードについて name と total_red_violations で集約して、降順にソートした上で先頭の10レコードを出力するという内容です。検証用に利用するS3バケットの直下にアップロードします。
import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'. :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations FROM restaurant_violations WHERE violation_type = 'RED' GROUP BY name ORDER BY total_red_violations DESC LIMIT 10""") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.") parser.add_argument( '--output_uri', help="The URI where output is saved, like an S3 bucket location.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri)
food_establishment_data.csv
food_establishment_data.zip からダウンロードします。解凍した food_establishment_data.csv を検証用に利用するS3バケット直下にアップロードします。中身はワシントン州キング郡における保健局の2006年から2020年の検査結果です。以下のようなデータが入っています。
name, inspection_result, inspection_closed_business, violation_type, violation_points 100 LB CLAM, Unsatisfactory, FALSE, BLUE, 5 100 PERCENT NUTRICION, Unsatisfactory, FALSE, BLUE, 5 7-ELEVEN #2361-39423A, Complete, FALSE, , 0
EMRクラスターの作成
手順の再現が容易なのでCLIベースで進めます。まずはIAM Roleを作成します。次にEMRのクラスターを作成します。
IAM Roleの作成
EMRクラスターで利用する各種IAM Roleを作成します。
aws emr create-default-roles
コマンドの詳細は create-default-roles — AWS CLI 2.12.7 Command Reference を参照してください。なお、すでに作成済みの場合はこの手順は不要です。また、再実行しても特に問題はありません。以下の3つのIAM Roleが作成されます。
- EMR_EC2_DefaultRole
- EMR_DefaultRole
- EMR_AutoScaling_DefaultRole
EMRクラスターの作成
次にEMRクラスターを作成します。
aws emr create-cluster \ --name "My First EMR Cluster" \ --release-label emr-6.11.0 \ --applications Name=Spark \ --instance-type m6a.xlarge \ --instance-count 1 \ --use-default-roles
コマンドの詳細は create-cluster — AWS CLI 2.12.7 Command Reference を参照してください。公式ドキュメントとの違いは以下のとおりです。
- instance-type: 更に安い m6a.xlarge に変更した
- instance-count: プライマリーノードのみの構成にするため 1 に変更した
- --ec2-attributes KeyName: SSHで接続しないため記述自体を削除した
実行すると以下のような内容が出力されます。
{ "ClusterId": "j-XXXXXXXX", "ClusterArn": "arn:aws:elasticmapreduce:ap-northeast-1:XXXXXXXX:cluster/j-XXXXXXX" }
クラスターの状態の確認
作成したクラスターの状態を確認します。<myClusterId>
の箇所に create-cluster
コマンドの処理結果の ClusterId
の箇所の値を入れてください。
aws emr describe-cluster --cluster-id <myClusterId>
実行すると以下のような内容が出力されます。コマンドの結果が長いため一部割愛しています。ポイントは State が WAITING
になっていることです。WAITING
はアプリケーションの実行待ちの状態なのでジョブを送信すれば実行されます。
{ "Cluster": { "Id": "j-XXXXXXXX", "Name": "My First EMR Cluster", "Status": { "State": "WAITING", "StateChangeReason": { "Message": "Cluster ready to run steps." }, "Timeline": { "CreationDateTime": "2023-07-07T10:45:48.757000+00:00", "ReadyDateTime": "2023-07-07T10:50:13.655000+00:00" } }, ...
もしくはクラスターの一覧を出力するコマンドで WAITING
のもののみ出力するという確認の仕方もあります。
aws emr list-clusters --cluster-states WAITING
実行して以下のような内容が出力されればクラスターが WAITING
になっていると判断できます。複数のクラスターを起動している場合は id
が今回起動したクラスターのIDであることを確認してください。
{ "Clusters": [ { "Id": "j-XXXXXXX", "Name": "My First EMR Cluster", "Status": { "State": "WAITING", "StateChangeReason": { "Message": "Cluster ready to run steps." }, "Timeline": { "CreationDateTime": "2023-07-07T10:45:48.757000+00:00", "ReadyDateTime": "2023-07-07T10:50:13.655000+00:00" } }, "NormalizedInstanceHours": 0, "ClusterArn": "arn:aws:elasticmapreduce:ap-northeast-1:XXXXXXX:cluster/j-XXXXXXX" } ] }
ジョブの送信と結果の確認
待機中のEMRクラスターにジョブを送信しS3バケットに出力される処理結果を確認します。
ジョブの送信
クラスターにジョブを送信することで実際に処理を実行させることができます。<>
で囲まれている箇所を自身の環境に書き換えてください。DOC-EXAMPLE-BUCKET
の箇所は検証用S3バケットになります。また書き換えた際に <>
の文字の削除を忘れないようにしてください。
aws emr add-steps \ --cluster-id <myClusterId> \ --steps Type=Spark,Name="My Spark Application",ActionOnFailure=CONTINUE,Args=[<s3://DOC-EXAMPLE-BUCKET/health_violations.py>,--data_source,<s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv>,--output_uri,<s3://DOC-EXAMPLE-BUCKET/MyOutputFolder>]
実行すると以下のような内容が出力されます。コマンド名が add-steps
であったようにEMRでは個々のジョブのことを Step と呼びます。
{ "StepIds": [ "s-XXXXXXXXXXXXX" ] }
STEPの処理状況の確認
cluster-id と step-id を指定することでSTEPの処理状況を確認できます。
aws emr describe-step --cluster-id <myClusterId> --step-id <s-1XXXXXXXXXXA>
実行すると以下のような内容が出力されます。State が COMPLETED
になれば処理終了です。RUNNING
は処理中を意味します。COMPLETED
になるまで何度かコマンドを実行して処理状況を確認してください。
{ "Step": { "Id": "s-XXXXXXXX", "Name": "My Spark Application", "Config": { "Jar": "command-runner.jar", "Properties": {}, "Args": [ "spark-submit", "s3://XXXXXXXX/health_violations.py", "--data_source", "s3://XXXXXXXX/food_establishment_data.csv", "--output_uri", "s3://XXXXXXXX/MyOutputFolder" ] }, "ActionOnFailure": "CONTINUE", "Status": { "State": "RUNNING", "StateChangeReason": {}, "Timeline": { "CreationDateTime": "2023-07-07T11:03:54.358000+00:00", "StartDateTime": "2023-07-07T11:04:11.121000+00:00" } } } }
結果の確認
検証用に用意したS3バケットの MyOutputFolder
配下に処理結果のCSVファイルが出力されます。part-00000
から始まるファイルです。
CSVファイルの中身は以下のとおりです。
name,total_red_violations SUBWAY,322 T-MOBILE PARK,315 WHOLE FOODS MARKET,299 PCC COMMUNITY MARKETS,251 TACO TIME,240 MCDONALD'S,177 THAI GINGER,153 SAFEWAY INC #1508,143 TAQUERIA EL RINCONSITO,134 HIMITSU TERIYAKI,128
クラスターの削除と後処理
以上でチュートリアルは終了です。起動したEMRクラスターと検証用のS3バケットを削除します。
クラスターの削除
クラスターを削除しないと利用費が発生し続けるため削除します。なお、このコマンドは処理に成功した場合は処理結果として何も出力されません。
aws emr terminate-clusters --cluster-ids <myClusterId>
クラスター作成時にも利用した describe-cluster
コマンドを実行してクラスターの状態を確認します。
aws emr describe-cluster --cluster-id <myClusterId>
State が TERMINATED
になれば削除完了です。
{ "Cluster": { "Id": "j-XXXXX", "Name": "My First EMR Cluster", "Status": { "State": "TERMINATED", "StateChangeReason": { "Code": "USER_REQUEST", "Message": "Terminated by user request" }, "Timeline": { "CreationDateTime": "2023-07-07T10:45:48.757000+00:00", "ReadyDateTime": "2023-07-07T10:50:13.655000+00:00", "EndDateTime": "2023-07-07T11:22:50.819000+00:00" } }, ...
検証用S3バケットの削除
必要に応じて検証用に作成したS3バケットを削除してください。
最後に
4年ぶりに触ってみましたが基本的な使い方は変わっていないことを確認できました。EMRの利用方法は他にもEMR on EKSやEMR Serverlessも増えているのでそれぞれ触って違いを確認できればと考えています。